/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.loadtests;



import com.bff.gaia.unified.sdk.io.synthetic.SyntheticSourceOptions;

import com.bff.gaia.unified.sdk.io.synthetic.SyntheticStep;

import com.bff.gaia.unified.sdk.options.Default;

import com.bff.gaia.unified.sdk.options.Description;

import com.bff.gaia.unified.sdk.options.Validation;

import com.bff.gaia.unified.sdk.testutils.metrics.ByteMonitor;

import com.bff.gaia.unified.sdk.transforms.DoFn;

import com.bff.gaia.unified.sdk.transforms.ParDo;

import com.bff.gaia.unified.sdk.transforms.join.CoGbkResult;

import com.bff.gaia.unified.sdk.transforms.join.CoGroupByKey;

import com.bff.gaia.unified.sdk.transforms.join.KeyedPCollectionTuple;

import com.bff.gaia.unified.sdk.values.KV;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.TupleTag;

import com.bff.gaia.unified.sdk.io.synthetic.SyntheticOptions;



import javax.annotation.Nullable;

import java.io.IOException;

import java.util.Optional;



/**

 * Load test for {@link CoGroupByKey} operation.

 *

 * <p>The purpose of this test is to measure {@link CoGroupByKey}'s behaviour in stressful

 * conditions. It uses synthetic sources and {@link SyntheticStep} which both can be parametrized to

 * generate keys and values of various size, impose delay (sleep or cpu burnout) in various moments

 * during the pipeline execution and provide some other performance challenges.

 *

 * <p>In addition, this test allows to reiterate produced PCollection multiple times to see how the

 * pipeline behaves (e.g. if caches work etc.).

 *

 * <p>To run it manually, use the following command:

 *

 * <pre>

 *    ./gradlew :unified-sdks-java-load-tests:run -PloadTest.args='

 *      --iterations=1

 *      --sourceOptions={"numRecords":1000,...}

 *      --coSourceOptions={"numRecords":1000,...}

 *      --stepOptions={"outputRecordsPerInputRecord":2...}'

 *      -PloadTest.mainClass="com.bff.gaia.unified.sdk.loadtests.CoGroupByKeyLoadTest"

 * </pre>

 */

public class CoGroupByKeyLoadTest extends LoadTest<CoGroupByKeyLoadTest.Options> {



  private static final TupleTag<byte[]> INPUT_TAG = new TupleTag<>("input");

  private static final TupleTag<byte[]> CO_INPUT_TAG = new TupleTag<>("co-input");

  private static final String METRICS_NAMESPACE = "co_gbk";



  /** Pipeline options specific for this test. */

  public interface Options extends LoadTestOptions {



    @Description("Options for synthetic co-source.")

    @Validation.Required

    String getCoSourceOptions();



    void setCoSourceOptions(String sourceOptions);



    @Description("Co-input window duration. If not set global windows will be used.")

    @Nullable

    Long getCoInputWindowDurationSec();



    void setCoInputWindowDurationSec(Long coInputWindowDurationSec);



    @Description("Number of reiterations over per-key-grouped values to perform.")

    @Default.Integer(1)

    Integer getIterations();



    void setIterations(Integer iterations);

  }



  private CoGroupByKeyLoadTest(String[] args) throws IOException {

    super(args, Options.class, METRICS_NAMESPACE);

  }



  @Override

  void loadTest() throws IOException {

    SyntheticSourceOptions coSourceOptions =

        SyntheticOptions.fromJsonString(options.getCoSourceOptions(), SyntheticSourceOptions.class);



    Optional<SyntheticStep> syntheticStep = createStep(options.getStepOptions());



    PCollection<KV<byte[], byte[]>> input =

        pipeline.apply("Read input", readFromSource(sourceOptions));

    input = input.apply("Collect start time metrics (input)", ParDo.of(runtimeMonitor));

    input = applyWindowing(input);

    input = applyStepIfPresent(input, "Synthetic step for input", syntheticStep);



    PCollection<KV<byte[], byte[]>> coInput =

        pipeline.apply("Read co-input", readFromSource(coSourceOptions));

    coInput = coInput.apply("Collect start time metrics (co-input)", ParDo.of(runtimeMonitor));

    coInput = applyWindowing(coInput, options.getCoInputWindowDurationSec());

    coInput = applyStepIfPresent(coInput, "Synthetic step for co-input", syntheticStep);



    KeyedPCollectionTuple.of(INPUT_TAG, input)

        .and(CO_INPUT_TAG, coInput)

        .apply("CoGroupByKey", CoGroupByKey.create())

        .apply("Ungroup and reiterate", ParDo.of(new UngroupAndReiterate(options.getIterations())))

        .apply(

            "Collect total bytes", ParDo.of(new ByteMonitor(METRICS_NAMESPACE, "totalBytes.count")))

        .apply("Collect end time metrics", ParDo.of(runtimeMonitor));

  }



  private static class UngroupAndReiterate

      extends DoFn<KV<byte[], CoGbkResult>, KV<byte[], byte[]>> {



    private int iterations;



    UngroupAndReiterate(int iterations) {

      this.iterations = iterations;

    }



    @ProcessElement

    public void processElement(ProcessContext c) {

      byte[] key = c.element().getKey();

      CoGbkResult elementValue = c.element().getValue();



      Iterable<byte[]> inputs = elementValue.getAll(INPUT_TAG);

      Iterable<byte[]> coInputs = elementValue.getAll(CO_INPUT_TAG);



      // Reiterate "iterations" times, emit output only once.

      for (int i = 0; i < iterations; i++) {

        for (byte[] value : inputs) {

          if (i == iterations - 1) {

            c.output(KV.of(key, value));

          }

        }



        for (byte[] value : coInputs) {

          if (i == iterations - 1) {

            c.output(KV.of(key, value));

          }

        }

      }

    }

  }



  public static void main(String[] args) throws IOException {

    new CoGroupByKeyLoadTest(args).run();

  }

}